package com.stars.easyms.schedule.mq.activemq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.stars.easyms.schedule.client.DistributedTaskExecutionResult;
import com.stars.easyms.schedule.client.DistributedTaskExecutionResultWrapper;
import com.stars.easyms.schedule.exception.DistributedScheduleMQException;
import com.stars.easyms.schedule.mq.AbstractMQListener;
import com.stars.easyms.schedule.util.StringUtils;

import javax.jms.*;

/**
 * ActiveMQ监听类
 *
 * @author guoguifang
 */
public class ActiveMQListener extends AbstractMQListener {

    private String key;
    private ConnectionFactory connectionFactory;
    private Connection connection;
    private Session session;
    private volatile boolean running;

    @Override
    public synchronized void start() throws DistributedScheduleMQException {
        if (this.running) {
            if (logger.isDebugEnabled()) {
                logger.debug("The ActiveMQ listener({}) is already running!", key);
            }
            return;
        }
        try {
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(false, 2);
            Destination destination = this.session.createQueue(key);
            MessageConsumer consumer = this.session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    logger.info("The ActiveMQ listener({}) receive message [{}].", key, JSON.toJSONString(message));
                    if (message instanceof TextMessage) {
                        TextMessage tm = (TextMessage) message;
                        String text;
                        try {
                            text = tm.getText();
                        } catch (JMSException e) {
                            logger.error("The ActiveMQ listener(" + key + ") receive TextMessage [" + JSON.toJSONString(message) + "] conversion failure!", e);
                            return;
                        }
                        if (StringUtils.isBlank(text)) {
                            logger.error("The ActiveMQ listener(" + key + ") receive TextMessage [" + JSON.toJSONString(message) + "] is blank!");
                            return;
                        }
                        DistributedTaskExecutionResult distributedTaskExecutionResult;
                        try {
                            distributedTaskExecutionResult = JSON.parseObject(text, new TypeReference<DistributedTaskExecutionResult>() {}, Feature.SupportNonPublicField);
                        } catch (Exception e) {
                            logger.error("The ActiveMQ listener(" + key + ") receive message [" + text + "] convert class 'DistributedTaskExecutionResult' failure!", e);
                            return;
                        }
                        if (distributedTaskExecutionResult == null) {
                            logger.error("The ActiveMQ listener(" + key + ") receive message [" + text + "] is invalid!");
                            return;
                        }
                        DistributedTaskExecutionResultWrapper distributedTaskExecutionResultWrapper = new DistributedTaskExecutionResultWrapper(distributedTaskExecutionResult);
                        if (distributedTaskExecutionResultWrapper.getId() == null || distributedTaskExecutionResultWrapper.getVersion() == null) {
                            logger.error("The ActiveMQ listener(" + key + ") receive message [" + distributedTaskExecutionResult + "] id or version is null!");
                            return;
                        }
                        updateSubTaskStatus(distributedTaskExecutionResultWrapper);
                    } else {
                        logger.error("The ActiveMQ listener(" + key + ") received message type is invalid!");
                    }
                }
            });
        } catch (Exception e) {
            this.close();
            throw new DistributedScheduleMQException("The ActiveMQ listener(" + key + ") destination start up failure!", e);
        }
        this.running = true;
        logger.info("The ActiveMQ listener({}) destination start up success!", key);
    }

    @Override
    public synchronized void shutdown() {
        if (!this.running) {
            if (logger.isDebugEnabled()) {
                logger.debug("The ActiveMQ listener({}) is not started!", key);
            }
            return;
        }
        this.close();
        logger.info("The ActiveMQ listener({}) shut down success!", key);
    }

    private synchronized void close() {
        if (this.session != null) {
            try {
                this.session.close();
            } catch (Exception e) {
                logger.error("The ActiveMQ listener(" + key + ") session close failure!", e);
            }
        }
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (Exception e) {
                logger.error("The ActiveMQ listener(" + key + ") connection close failure!", e);
            }
        }
        this.session = null;
        this.connection = null;
        this.running = false;
    }

    @Override
    public synchronized boolean isRunning() {
        return this.running;
    }

    public ActiveMQListener(String brokerUrl, String user, String password, String key) {
        this.connectionFactory = BasicActiveMQConnectionFactory.getActiveMQConnectionFactory(brokerUrl, user, password);
        this.key = key;
    }
}
